Skip to content

fix(agent-worker): propagate runId + runJobToken through JobEventSchema#874

Merged
buremba merged 1 commit into
mainfrom
fix/dispatch-propagate-run-fields
May 18, 2026
Merged

fix(agent-worker): propagate runId + runJobToken through JobEventSchema#874
buremba merged 1 commit into
mainfrom
fix/dispatch-propagate-run-fields

Conversation

@buremba
Copy link
Copy Markdown
Member

@buremba buremba commented May 18, 2026

Summary

PR #871 flipped LOBU_SESSION_STORE default to snapshot mode. PR #865 added a startup assertion that throws if snapshot mode is on but WorkerConfig.runId is missing. Together they broke every Telegram chat in prod:

[error] [sse-client] Agent failed {
  "error":"Snapshot mode (LOBU_SESSION_STORE != 'file') but WorkerConfig.runId is missing
           — runs-queue dispatch did not stamp runId on the job payload"
}

The gateway sets data.runId (message-consumer.ts:149) and data.runJobToken (line 185) correctly. job-router writes the whole payload to SSE. The worker reads payload.runId / payload.runJobToken in payloadToWorkerConfig (sse-client.ts:925-935).

The dropped link was JobEventSchema. Its inner payload was a plain z.object(...) — default zod mode is strict-strip-unknown, so runId and runJobToken were silently removed at safeParse. payload.runId therefore always reached the worker as undefined, and the assertion fired on every message.

Fix

  • Declare runId + runJobToken explicitly on the schema.
  • Add .passthrough() so future MessagePayload fields (mcpConfig, nixConfig, egressConfig, preApprovedTools, exec*, organizationId, networkConfig, ...) don't regress the same way.

Diff is 5 lines of real logic; rest is the schema + comments.

Reproducer

Pre-fix — revert the schema change, run the new test:

expect(received).toBe(expected)
Expected: 12345
Received: undefined
  at packages/agent-worker/src/__tests__/sse-client.test.ts:93:29
(fail) propagates runId and runJobToken from job payload to handleThreadMessage
 4 pass, 1 fail

That undefined is exactly the dropped field that fires the prod assertion.

Post-fix — same test suite, schema restored:

5 pass
0 fail
Ran 5 tests across 1 file. [284.00ms]

Test plan

  • New regression test exercises the actual handleEvent("job", ...) parse path with runId + runJobToken and asserts they reach handleThreadMessage — fails pre-fix, passes post-fix.
  • New test pins payloadToWorkerConfig mapping into WorkerConfig.
  • New test confirms legacy direct-enqueue path (no runId) still threads undefined cleanly — backwards-compat preserved.
  • make typecheck clean.
  • make build-packages clean.
  • Post-merge: send a Telegram message in prod, verify a row appears in agent_transcript_snapshot for that run.

Summary by CodeRabbit

  • Tests

    • Added comprehensive test coverage for job event handling, verifying payload field propagation and worker configuration initialization for both new and legacy job enqueue paths.
  • Bug Fixes

    • Improved job event payload validation to include optional fields and preserve additional gateway-provided keys during parsing.

Review Change Stack

PR #871 flipped LOBU_SESSION_STORE default to snapshot mode. PR #865
added a startup assertion that the worker throws if snapshot mode is on
but WorkerConfig.runId is missing. Together those broke every Telegram
chat in prod with:

  "Snapshot mode (LOBU_SESSION_STORE != 'file') but WorkerConfig.runId
   is missing — runs-queue dispatch did not stamp runId on the job
   payload"

The gateway-side MessageConsumer correctly sets data.runId (line 149) and
data.runJobToken (line 185) before dispatch. job-router writes the full
payload to SSE. The worker reads payload.runId / payload.runJobToken in
payloadToWorkerConfig (sse-client.ts:925-935).

The missing link was JobEventSchema. Its inner payload object used plain
z.object(...) which is strict-strip by default — runId and runJobToken
were silently dropped at safeParse, so payload.runId was always
undefined and the assertion fired on every message.

Fix: declare runId + runJobToken explicitly on the schema, and add
.passthrough() so future MessagePayload fields (mcpConfig, nixConfig,
egressConfig, preApprovedTools, exec*, organizationId, networkConfig…)
don't regress the same way.

Tests:
- new regression test feeds a job event with runId + runJobToken through
  handleEvent and asserts they reach handleThreadMessage (pre-fix:
  undefined; post-fix: preserved)
- new test pins payloadToWorkerConfig's mapping of runId/runJobToken
- new test confirms the legacy direct-enqueue path (no runId) still
  threads undefined cleanly
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: e9df6841-ddd0-4796-ae78-d699714541ec

📥 Commits

Reviewing files that changed from the base of the PR and between 230d7bc and 692db03.

📒 Files selected for processing (2)
  • packages/agent-worker/src/__tests__/sse-client.test.ts
  • packages/agent-worker/src/gateway/sse-client.ts

📝 Walkthrough

Walkthrough

The PR adds support for propagating runId and runJobToken from SSE job payloads into worker configuration. The SSE schema is updated to validate these optional fields and preserve unknown gateway keys, then three test cases verify the fields flow correctly through event handling and config mapping while maintaining backwards compatibility.

Changes

RunId and RunJobToken Propagation

Layer / File(s) Summary
SSE payload schema contract update
packages/agent-worker/src/gateway/sse-client.ts
JobEventSchema now explicitly declares optional runId (number) and runJobToken (string) fields within payload.object, and uses .passthrough() to preserve unknown keys from the gateway instead of dropping them.
Test coverage for runId and runJobToken propagation
packages/agent-worker/src/__tests__/sse-client.test.ts
Three tests verify that runId and runJobToken propagate from SSE payload through handleEvent to handleThreadMessage, that payloadToWorkerConfig threads them into WorkerConfig, and that the mapping preserves undefined values for backwards compatibility when fields are absent.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Poem

A rabbit hops through payloads wide,
With runId and token at its side,
Schema blessed with .passthrough() grace,
Three tests keep the old code in place! 🐰

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/dispatch-propagate-run-fields

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.


Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@buremba buremba merged commit d6b3b68 into main May 18, 2026
18 of 20 checks passed
@buremba buremba deleted the fix/dispatch-propagate-run-fields branch May 18, 2026 14:42
buremba added a commit that referenced this pull request May 18, 2026
…ects for new runs

Live prod bug — third in the Phase 5 chain. Snapshot mode is default,
worker correctly POSTs to /worker/transcript/snapshot with the right
runId (PR #874), but the gateway's isRunOwnedByJwtScope verifier
rejects with 403 on every call because `runs.action_input` is stored
as a JSONB **string** (double-encoded), not a JSONB object. The
verifier's `->> 'agentId'` returns NULL on a JSONB string, so the
scope comparison fails.

Root cause traced to runs-queue.ts:309 — `JSON.stringify(data)` was
bound to a `$4::jsonb` parameter, which Postgres ingested as a JSONB
string scalar. Fixed by passing the object through postgres-js's
`sql.json()` helper so the driver sends a proper JSONB object.

Two-part fix:
- Verifier (transcript-routes.ts): CASE jsonb_typeof to handle both
  shapes — object rows use direct `->>`, string rows unwrap via
  `(action_input #>> '{}')::jsonb`. New rows post fix always take the
  'object' branch; legacy in-flight string rows authorize correctly
  during the deploy crossover window.
- Dispatch (runs-queue.ts): write JSONB objects directly via sql.json
  going forward. New chat_message / task rows store proper objects.

Tests cover both shapes in the verifier and the new dispatch shape.
buremba added a commit that referenced this pull request May 18, 2026
…ects for new runs (#877)

* fix(server): handle action_input JSONB-string shape + write JSONB objects for new runs

Live prod bug — third in the Phase 5 chain. Snapshot mode is default,
worker correctly POSTs to /worker/transcript/snapshot with the right
runId (PR #874), but the gateway's isRunOwnedByJwtScope verifier
rejects with 403 on every call because `runs.action_input` is stored
as a JSONB **string** (double-encoded), not a JSONB object. The
verifier's `->> 'agentId'` returns NULL on a JSONB string, so the
scope comparison fails.

Root cause traced to runs-queue.ts:309 — `JSON.stringify(data)` was
bound to a `$4::jsonb` parameter, which Postgres ingested as a JSONB
string scalar. Fixed by passing the object through postgres-js's
`sql.json()` helper so the driver sends a proper JSONB object.

Two-part fix:
- Verifier (transcript-routes.ts): CASE jsonb_typeof to handle both
  shapes — object rows use direct `->>`, string rows unwrap via
  `(action_input #>> '{}')::jsonb`. New rows post fix always take the
  'object' branch; legacy in-flight string rows authorize correctly
  during the deploy crossover window.
- Dispatch (runs-queue.ts): write JSONB objects directly via sql.json
  going forward. New chat_message / task rows store proper objects.

Tests cover both shapes in the verifier and the new dispatch shape.

* chore(submodule): bump owletto to clear drift check
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants